/* Copyright (C) 2014 InfiniDB, Inc.
   Copyright (C) 2019 MariaDB Corporation

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

//  $Id: tupleconstantstep.cpp 9649 2013-06-25 16:08:05Z xlou $

//#define NDEBUG
#include <cassert>
#include <sstream>
#include <iomanip>
using namespace std;

#include <boost/shared_ptr.hpp>

#include <boost/uuid/uuid_io.hpp>
using namespace boost;

#include "messagequeue.h"
using namespace messageqcpp;

#include "loggingid.h"
#include "errorcodes.h"
#include "idberrorinfo.h"
#include "errorids.h"
using namespace logging;

#include "calpontsystemcatalog.h"
#include "constantcolumn.h"
using namespace execplan;

#include "jobstep.h"
#include "rowgroup.h"
using namespace rowgroup;

#include "querytele.h"
using namespace querytele;

#include "jlf_common.h"
#include "tupleconstantstep.h"

namespace joblist
{
// static utility method
SJSTEP TupleConstantStep::addConstantStep(const JobInfo& jobInfo, const rowgroup::RowGroup* rg)
{
  TupleConstantStep* tcs = NULL;

  if (jobInfo.constantCol != CONST_COL_ONLY)
  {
    tcs = new TupleConstantStep(jobInfo);
  }
  else
  {
    tcs = new TupleConstantOnlyStep(jobInfo);
  }

  tcs->initialize(jobInfo, rg);
  SJSTEP spcs(tcs);
  return spcs;
}

TupleConstantStep::TupleConstantStep(const JobInfo& jobInfo)
 : JobStep(jobInfo)
 , fRowsReturned(0)
 , fInputDL(NULL)
 , fOutputDL(NULL)
 , fInputIterator(0)
 , fRunner(0)
 , fEndOfResult(false)
{
  fExtendedInfo = "TCS: ";
  fQtc.stepParms().stepType = StepTeleStats::T_TCS;
}

TupleConstantStep::~TupleConstantStep()
{
}

void TupleConstantStep::setOutputRowGroup(const rowgroup::RowGroup& rg)
{
  throw runtime_error("Disabled, use initialize() to set output RowGroup.");
}

void TupleConstantStep::initialize(const JobInfo& jobInfo, const RowGroup* rgIn)
{
  vector<uint32_t> oids, oidsIn = fRowGroupIn.getOIDs();
  vector<uint32_t> keys, keysIn = fRowGroupIn.getKeys();
  vector<uint32_t> scale, scaleIn = fRowGroupIn.getScale();
  vector<uint32_t> precision, precisionIn = fRowGroupIn.getPrecision();
  vector<CalpontSystemCatalog::ColDataType> types, typesIn = fRowGroupIn.getColTypes();
  vector<uint32_t> csNums, csNumsIn = fRowGroupIn.getCharsetNumbers();
  vector<uint32_t> pos;
  pos.push_back(2);

  if (rgIn)
  {
    fRowGroupIn = *rgIn;
    fRowGroupIn.initRow(&fRowIn);
    oidsIn = fRowGroupIn.getOIDs();
    keysIn = fRowGroupIn.getKeys();
    scaleIn = fRowGroupIn.getScale();
    precisionIn = fRowGroupIn.getPrecision();
    typesIn = fRowGroupIn.getColTypes();
    csNumsIn = fRowGroupIn.getCharsetNumbers();
  }

  for (uint64_t i = 0, j = 0; i < jobInfo.deliveredCols.size(); i++)
  {
    const ConstantColumn* cc = dynamic_cast<const ConstantColumn*>(jobInfo.deliveredCols[i].get());

    if (cc != NULL)
    {
      CalpontSystemCatalog::ColType ct = cc->resultType();

      if (ct.colDataType == CalpontSystemCatalog::VARCHAR)
        ct.colWidth++;

      // Round colWidth up
      if (ct.colWidth == 3)
        ct.colWidth = 4;
      else if (ct.colWidth == 5 || ct.colWidth == 6 || ct.colWidth == 7)
        ct.colWidth = 8;

      oids.push_back(-1);
      keys.push_back(-1);
      scale.push_back(ct.scale);
      precision.push_back(ct.precision);
      types.push_back(ct.colDataType);
      csNums.push_back(ct.charsetNumber);
      pos.push_back(pos.back() + ct.colWidth);

      fIndexConst.push_back(i);
    }
    else
    {
      // select (select a) from region;
      if (j >= oidsIn.size() && jobInfo.tableList.empty())
      {
        throw IDBExcept(ERR_NO_FROM);
      }

      idbassert(j < oidsIn.size());

      oids.push_back(oidsIn[j]);
      keys.push_back(keysIn[j]);
      scale.push_back(scaleIn[j]);
      precision.push_back(precisionIn[j]);
      types.push_back(typesIn[j]);
      csNums.push_back(csNumsIn[j]);
      pos.push_back(pos.back() + fRowGroupIn.getColumnWidth(j));
      j++;

      fIndexMapping.push_back(i);
    }
  }

  fRowGroupOut =
      RowGroup(oids.size(), pos, oids, keys, types, csNums, scale, precision, jobInfo.stringTableThreshold);
  fRowGroupOut.initRow(&fRowOut);
  fRowGroupOut.initRow(&fRowConst, true);

  constructContanstRow(jobInfo);
}

void TupleConstantStep::constructContanstRow(const JobInfo& jobInfo)
{
  // construct a row with only the constant values
  fConstRowData.reset(new uint8_t[fRowConst.getSize()]);
  fRowConst.setData(rowgroup::Row::Pointer(fConstRowData.get()));
  fRowConst.initToNull();  // make sure every col is init'd to something, because later we copy the whole row
  const vector<CalpontSystemCatalog::ColDataType>& types = fRowGroupOut.getColTypes();

  for (vector<uint64_t>::iterator i = fIndexConst.begin(); i != fIndexConst.end(); i++)
  {
    const ConstantColumn* cc = dynamic_cast<const ConstantColumn*>(jobInfo.deliveredCols[*i].get());
    const execplan::Result c = cc->result();

    if (cc->isNull())
    {
      if (types[*i] == CalpontSystemCatalog::CHAR || types[*i] == CalpontSystemCatalog::VARCHAR ||
          types[*i] == CalpontSystemCatalog::TEXT)
      {
        fRowConst.setStringField(nullptr, 0, *i);
      }
      else if (isUnsigned(types[*i]))
      {
        fRowConst.setUintField(fRowConst.getNullValue(*i), *i);
      }
      else
      {
        fRowConst.setIntField(fRowConst.getSignedNullValue(*i), *i);
      }

      continue;
    }

    switch (types[*i])
    {
      case CalpontSystemCatalog::TINYINT:
      case CalpontSystemCatalog::SMALLINT:
      case CalpontSystemCatalog::MEDINT:
      case CalpontSystemCatalog::INT:
      case CalpontSystemCatalog::BIGINT:
      case CalpontSystemCatalog::DATE:
      case CalpontSystemCatalog::DATETIME:
      case CalpontSystemCatalog::TIME:
      case CalpontSystemCatalog::TIMESTAMP:
      {
        fRowConst.setIntField(c.intVal, *i);
        break;
      }

      case CalpontSystemCatalog::DECIMAL:
      case CalpontSystemCatalog::UDECIMAL:
      {
        if (fRowGroupOut.getColWidths()[*i] > datatypes::MAXLEGACYWIDTH)
          fRowConst.setInt128Field(c.decimalVal.TSInt128::getValue(), *i);
        else
          fRowConst.setIntField(c.decimalVal.value, *i);
        break;
      }

      case CalpontSystemCatalog::FLOAT:
      case CalpontSystemCatalog::UFLOAT:
      {
        fRowConst.setFloatField(c.floatVal, *i);
        break;
      }

      case CalpontSystemCatalog::DOUBLE:
      case CalpontSystemCatalog::UDOUBLE:
      {
        fRowConst.setDoubleField(c.doubleVal, *i);
        break;
      }

      case CalpontSystemCatalog::LONGDOUBLE:
      {
        fRowConst.setLongDoubleField(c.longDoubleVal, *i);
        break;
      }

      case CalpontSystemCatalog::CHAR:
      case CalpontSystemCatalog::VARCHAR:
      case CalpontSystemCatalog::TEXT:
      {
        fRowConst.setStringField(c.strVal, *i);
        break;
      }

      case CalpontSystemCatalog::UTINYINT:
      case CalpontSystemCatalog::USMALLINT:
      case CalpontSystemCatalog::UMEDINT:
      case CalpontSystemCatalog::UINT:
      case CalpontSystemCatalog::UBIGINT:
      {
        fRowConst.setUintField(c.uintVal, *i);
        break;
      }

      default:
      {
        throw runtime_error("un-supported constant column type.");
        break;
      }
    }  // switch
  }    // for constant columns
}

void TupleConstantStep::run()
{
  if (fInputJobStepAssociation.outSize() == 0)
  {
    throw logic_error("No input data list for constant step.");
  }

  fInputDL = fInputJobStepAssociation.outAt(0)->rowGroupDL();

  if (fInputDL == NULL)
    throw logic_error("Input is not a RowGroup data list.");

  fInputIterator = fInputDL->getIterator();

  if (fDelivery == false)
  {
    if (fOutputJobStepAssociation.outSize() == 0)
      throw logic_error("No output data list for non-delivery constant step.");

    fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL();

    if (fOutputDL == NULL)
      throw logic_error("Output is not a RowGroup data list.");

    fRunner = jobstepThreadPool.invoke(Runner(this));
  }
}

void TupleConstantStep::join()
{
  if (fRunner)
    jobstepThreadPool.join(fRunner);
}

uint32_t TupleConstantStep::nextBand(messageqcpp::ByteStream& bs)
{
  RGData rgDataIn;
  RGData rgDataOut;
  bool more = false;
  uint32_t rowCount = 0;

  try
  {
    bs.restart();

    more = fInputDL->next(fInputIterator, &rgDataIn);

    if (traceOn() && dlTimes.FirstReadTime().tv_sec == 0)
      dlTimes.setFirstReadTime();

    if (!more && cancelled())
    {
      fEndOfResult = true;
    }

    if (more && !fEndOfResult)
    {
      fRowGroupIn.setData(&rgDataIn);
      rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount());
      fRowGroupOut.setData(&rgDataOut);

      fillInConstants();
      fRowGroupOut.serializeRGData(bs);
      rowCount = fRowGroupOut.getRowCount();
    }
    else
    {
      fEndOfResult = true;
    }
  }
  catch (...)
  {
    handleException(std::current_exception(), logging::tupleConstantStepErr, logging::ERR_ALWAYS_CRITICAL,
                    "TupleConstantStep::nextBand()");
    while (more)
      more = fInputDL->next(fInputIterator, &rgDataIn);

    fEndOfResult = true;
  }

  if (fEndOfResult)
  {
    // send an empty / error band
    RGData rgData(fRowGroupOut, 0);
    fRowGroupOut.setData(&rgData);
    fRowGroupOut.resetRowGroup(0);
    fRowGroupOut.setStatus(status());
    fRowGroupOut.serializeRGData(bs);

    if (traceOn())
    {
      dlTimes.setLastReadTime();
      dlTimes.setEndOfInputTime();
      printCalTrace();
    }
  }

  return rowCount;
}

void TupleConstantStep::execute()
{
  RGData rgDataIn;
  RGData rgDataOut;
  bool more = false;
  StepTeleStats sts;
  sts.query_uuid = fQueryUuid;
  sts.step_uuid = fStepUuid;

  try
  {
    more = fInputDL->next(fInputIterator, &rgDataIn);

    if (traceOn())
      dlTimes.setFirstReadTime();

    sts.msg_type = StepTeleStats::ST_START;
    sts.total_units_of_work = 1;
    postStepStartTele(sts);

    if (!more && cancelled())
    {
      fEndOfResult = true;
    }

    while (more && !fEndOfResult)
    {
      fRowGroupIn.setData(&rgDataIn);
      rgDataOut.reinit(fRowGroupOut, fRowGroupIn.getRowCount());
      fRowGroupOut.setData(&rgDataOut);

      fillInConstants();

      more = fInputDL->next(fInputIterator, &rgDataIn);

      if (cancelled())
      {
        fEndOfResult = true;
      }
      else
      {
        fOutputDL->insert(rgDataOut);
      }
    }
  }
  catch (...)
  {
    handleException(std::current_exception(), logging::tupleConstantStepErr, logging::ERR_ALWAYS_CRITICAL,
                    "TupleConstantStep::execute()");
  }

  while (more)
    more = fInputDL->next(fInputIterator, &rgDataIn);

  sts.msg_type = StepTeleStats::ST_SUMMARY;
  sts.total_units_of_work = sts.units_of_work_completed = 1;
  sts.rows = fRowsReturned;
  postStepSummaryTele(sts);

  // Bug 3136, let mini stats to be formatted if traceOn.
  if (traceOn())
  {
    dlTimes.setLastReadTime();
    dlTimes.setEndOfInputTime();
    printCalTrace();
  }

  fEndOfResult = true;
  fOutputDL->endOfInput();
}

// *DRRTUY Copy row at once not one field at a time
void TupleConstantStep::fillInConstants()
{
  fRowGroupIn.getRow(0, &fRowIn);
  fRowGroupOut.getRow(0, &fRowOut);

  if (fIndexConst.size() > 1 || fIndexConst[0] != 0)
  {
    for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i)
    {
      copyRow(fRowConst, &fRowOut);

      fRowOut.setRid(fRowIn.getRelRid());

      for (uint64_t j = 0; j < fIndexMapping.size(); ++j)
        fRowIn.copyField(fRowOut, fIndexMapping[j], j);

      fRowIn.nextRow();
      fRowOut.nextRow();
    }
  }
  else  // only first column is constant
  {
    for (uint64_t i = 0; i < fRowGroupIn.getRowCount(); ++i)
    {
      fRowOut.setRid(fRowIn.getRelRid());
      fRowConst.copyField(fRowOut, 0, 0);

      for (uint32_t i = 1; i < fRowOut.getColumnCount(); i++)
        fRowIn.copyField(fRowOut, i, i - 1);

      fRowIn.nextRow();
      fRowOut.nextRow();
    }
  }

  fRowGroupOut.resetRowGroup(fRowGroupIn.getBaseRid());
  fRowGroupOut.setRowCount(fRowGroupIn.getRowCount());
  fRowsReturned += fRowGroupOut.getRowCount();
}

void TupleConstantStep::fillInConstants(const rowgroup::Row& rowIn, rowgroup::Row& rowOut)
{
  if (fIndexConst.size() > 1 || fIndexConst[0] != 0)
  {
    copyRow(fRowConst, &rowOut);
    rowOut.setRid(rowIn.getRelRid());

    for (uint64_t j = 0; j < fIndexMapping.size(); ++j)
      rowIn.copyField(rowOut, fIndexMapping[j], j);
  }
  else  // only first column is constant
  {
    rowOut.setRid(rowIn.getRelRid());
    fRowConst.copyField(rowOut, 0, 0);

    for (uint32_t i = 1; i < rowOut.getColumnCount(); i++)
      rowIn.copyField(rowOut, i, i - 1);
  }
}

const RowGroup& TupleConstantStep::getOutputRowGroup() const
{
  return fRowGroupOut;
}

const RowGroup& TupleConstantStep::getDeliveredRowGroup() const
{
  return fRowGroupOut;
}

void TupleConstantStep::deliverStringTableRowGroup(bool b)
{
  fRowGroupOut.setUseStringTable(b);
}

bool TupleConstantStep::deliverStringTableRowGroup() const
{
  return fRowGroupOut.usesStringTable();
}

const string TupleConstantStep::toString() const
{
  ostringstream oss;
  oss << "ConstantStep   ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;

  oss << " in:";

  for (unsigned i = 0; i < fInputJobStepAssociation.outSize(); i++)
    oss << fInputJobStepAssociation.outAt(i);

  oss << " out:";

  for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
    oss << fOutputJobStepAssociation.outAt(i);

  oss << endl;

  return oss.str();
}

void TupleConstantStep::printCalTrace()
{
  time_t t = time(0);
  char timeString[50];
  ctime_r(&t, timeString);
  timeString[strlen(timeString) - 1] = '\0';
  ostringstream logStr;
  logStr << "ses:" << fSessionId << " st: " << fStepId << " finished at " << timeString
         << "; total rows returned-" << fRowsReturned << endl
         << "\t1st read " << dlTimes.FirstReadTimeString() << "; EOI " << dlTimes.EndOfInputTimeString()
         << "; runtime-" << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime())
         << "s;\n\tUUID " << uuids::to_string(fStepUuid) << endl
         << "\tJob completion status " << status() << endl;
  logEnd(logStr.str().c_str());
  fExtendedInfo += logStr.str();
  formatMiniStats();
}

void TupleConstantStep::formatMiniStats()
{
  ostringstream oss;
  oss << "TCS "
      << "UM "
      << "- "
      << "- "
      << "- "
      << "- "
      << "- "
      << "- " << JSTimeStamp::tsdiffstr(dlTimes.EndOfInputTime(), dlTimes.FirstReadTime()) << " "
      << fRowsReturned << " ";
  fMiniInfo += oss.str();
}

// class TupleConstantOnlyStep
TupleConstantOnlyStep::TupleConstantOnlyStep(const JobInfo& jobInfo)
  : TupleConstantStep(jobInfo)
  , fEmptySet(jobInfo.constantFalse)
{
  //	fExtendedInfo = "TCOS: ";
}

TupleConstantOnlyStep::~TupleConstantOnlyStep()
{
}

// void TupleConstantOnlyStep::initialize(const RowGroup& rgIn, const JobInfo& jobInfo)
void TupleConstantOnlyStep::initialize(const JobInfo& jobInfo, const rowgroup::RowGroup* rgIn)
{
  vector<uint32_t> oids;
  vector<uint32_t> keys;
  vector<uint32_t> scale;
  vector<uint32_t> precision;
  vector<CalpontSystemCatalog::ColDataType> types;
  vector<uint32_t> csNums;
  vector<uint32_t> pos;
  pos.push_back(2);

  deliverStringTableRowGroup(false);

  for (uint64_t i = 0; i < jobInfo.deliveredCols.size(); i++)
  {
    const ConstantColumn* cc = dynamic_cast<const ConstantColumn*>(jobInfo.deliveredCols[i].get());

    if (cc == NULL)
      throw runtime_error("none constant column found.");

    CalpontSystemCatalog::ColType ct = cc->resultType();

    if (ct.colDataType == CalpontSystemCatalog::VARCHAR)
      ct.colWidth++;

    // Round colWidth up
    if (ct.colWidth == 3)
      ct.colWidth = 4;
    else if (ct.colWidth == 5 || ct.colWidth == 6 || ct.colWidth == 7)
      ct.colWidth = 8;

    oids.push_back(-1);
    keys.push_back(-1);
    scale.push_back(ct.scale);
    precision.push_back(ct.precision);
    types.push_back(ct.colDataType);
    csNums.push_back(ct.charsetNumber);
    pos.push_back(pos.back() + ct.colWidth);

    fIndexConst.push_back(i);
  }

  fRowGroupOut = RowGroup(oids.size(), pos, oids, keys, types, csNums, scale, precision,
                          jobInfo.stringTableThreshold, false);
  fRowGroupOut.initRow(&fRowOut);
  fRowGroupOut.initRow(&fRowConst, true);

  constructContanstRow(jobInfo);
}

void TupleConstantOnlyStep::run()
{
  if (fDelivery == false)
  {
    if (fOutputJobStepAssociation.outSize() == 0)
      throw logic_error("No output data list for non-delivery constant step.");

    fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL();

    if (fOutputDL == NULL)
      throw logic_error("Output is not a RowGroup data list.");

    try
    {
      RGData rgDataOut(fRowGroupOut, 1);
      fRowGroupOut.setData(&rgDataOut);

      if (traceOn())
        dlTimes.setFirstReadTime();

      fillInConstants();

      if (!fEmptySet)
      {
        fOutputDL->insert(rgDataOut);
      }
    }
    catch (...)
    {
      handleException(std::current_exception(), logging::tupleConstantStepErr, logging::ERR_ALWAYS_CRITICAL,
                      "TupleConstantOnlyStep::run()");
    }

    if (traceOn())
    {
      dlTimes.setLastReadTime();
      dlTimes.setEndOfInputTime();
      printCalTrace();
    }

    // Bug 3136, let mini stats to be formatted if traceOn.
    fEndOfResult = true;
    fOutputDL->endOfInput();
  }
}

uint32_t TupleConstantOnlyStep::nextBand(messageqcpp::ByteStream& bs)
{
  RGData rgDataOut;
  uint32_t rowCount = 0;

  if (!fEndOfResult)
  {
    try
    {
      bs.restart();

      if (traceOn() && dlTimes.FirstReadTime().tv_sec == 0)
        dlTimes.setFirstReadTime();

      rgDataOut.reinit(fRowGroupOut, 1);
      fRowGroupOut.setData(&rgDataOut);

      fillInConstants();
      fRowGroupOut.serializeRGData(bs);
      rowCount = fRowGroupOut.getRowCount();
    }
    catch (...)
    {
      handleException(std::current_exception(), logging::tupleConstantStepErr, logging::ERR_ALWAYS_CRITICAL,
                      "TupleConstantOnlyStep::nextBand()");
    }

    fEndOfResult = true;
  }
  else
  {
    // send an empty / error band
    RGData rgData(fRowGroupOut, 0);
    fRowGroupOut.setData(&rgData);
    fRowGroupOut.resetRowGroup(0);
    fRowGroupOut.setStatus(status());
    fRowGroupOut.serializeRGData(bs);

    if (traceOn())
    {
      dlTimes.setLastReadTime();
      dlTimes.setEndOfInputTime();
      printCalTrace();
    }
  }

  return rowCount;
}

void TupleConstantOnlyStep::fillInConstants()
{
  fRowGroupOut.getRow(0, &fRowOut);
  idbassert(fRowConst.getColumnCount() == fRowOut.getColumnCount());
  fRowOut.usesStringTable(fRowConst.usesStringTable());
  copyRow(fRowConst, &fRowOut);
  fRowGroupOut.resetRowGroup(0);
  fRowGroupOut.setRowCount(1);
  fRowsReturned = 1;
}

const string TupleConstantOnlyStep::toString() const
{
  ostringstream oss;
  oss << "ConstantOnlyStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;

  oss << " out:";

  for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
    oss << fOutputJobStepAssociation.outAt(i);

  oss << endl;

  return oss.str();
}

// class TupleConstantBooleanStep
TupleConstantBooleanStep::TupleConstantBooleanStep(const JobInfo& jobInfo, bool value)
 : TupleConstantStep(jobInfo), fValue(value)
{
  //	fExtendedInfo = "TCBS: ";
}

TupleConstantBooleanStep::~TupleConstantBooleanStep()
{
}

void TupleConstantBooleanStep::initialize(const RowGroup& rgIn, const JobInfo&)
{
  fRowGroupOut = rgIn;
  fRowGroupOut.initRow(&fRowOut);
  fRowGroupOut.initRow(&fRowConst, true);
}

void TupleConstantBooleanStep::run()
{
  if (fDelivery == false)
  {
    if (fOutputJobStepAssociation.outSize() == 0)
      throw logic_error("No output data list for non-delivery constant step.");

    fOutputDL = fOutputJobStepAssociation.outAt(0)->rowGroupDL();

    if (fOutputDL == NULL)
      throw logic_error("Output is not a RowGroup data list.");

    if (traceOn())
    {
      dlTimes.setFirstReadTime();
      dlTimes.setLastReadTime();
      dlTimes.setEndOfInputTime();
      printCalTrace();
    }

    // Bug 3136, let mini stats to be formatted if traceOn.
    fOutputDL->endOfInput();
  }
}

uint32_t TupleConstantBooleanStep::nextBand(messageqcpp::ByteStream& bs)
{
  // send an empty band
  RGData rgData(fRowGroupOut, 0);
  fRowGroupOut.setData(&rgData);
  fRowGroupOut.resetRowGroup(0);
  fRowGroupOut.setStatus(status());
  fRowGroupOut.serializeRGData(bs);

  if (traceOn())
  {
    dlTimes.setFirstReadTime();
    dlTimes.setLastReadTime();
    dlTimes.setEndOfInputTime();
    printCalTrace();
  }

  return 0;
}

const string TupleConstantBooleanStep::toString() const
{
  ostringstream oss;
  oss << "ConstantBooleanStep ses:" << fSessionId << " txn:" << fTxnId << " st:" << fStepId;

  oss << " out:";

  for (unsigned i = 0; i < fOutputJobStepAssociation.outSize(); i++)
    oss << fOutputJobStepAssociation.outAt(i);

  oss << endl;

  return oss.str();
}

}  // namespace joblist
